package org.budo.warehouse.logic.producer;

import java.util.List;

import org.budo.canal.message.handler.RowChangeUtil;
import org.budo.support.lang.util.BooleanUtil;
import org.budo.support.lang.util.NumberUtil;
import org.budo.support.lang.util.StringUtil;
import org.budo.warehouse.logic.api.DataEntry;

import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

/**
 * @author limingwei
 */
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class CanalDataEntry implements DataEntry {
    private Entry entry;

    private Integer sourceDataNodeId;

    private List<RowData> _rowDataList;

    public CanalDataEntry(Entry entry, Integer dataNodeId) {
        this.setEntry(entry);
        this.setSourceDataNodeId(dataNodeId);
    }

    @Override
    public String getEventType() {
        return entry.getHeader().getEventType().name();
    }

    @Override
    public String getSchemaName() {
        return entry.getHeader().getSchemaName();
    }

    @Override
    public String getTableName() {
        return entry.getHeader().getTableName();
    }

    @Override
    public Integer getRowCount() {
        return this.getRowDataList().size();
    }

    public List<RowData> getRowDataList() {
        if (null != this._rowDataList) {
            return this._rowDataList;
        }

        RowChange rowChange = RowChangeUtil.rowChangeParseFromByteString(this.getEntry().getStoreValue());
        List<RowData> rowDataList = rowChange.getRowDatasList();
        return this._rowDataList = rowDataList;
    }

    @Override
    public Integer getColumnCount(Integer rowIndex) {
        RowData rowData = this.getRowDataList().get(rowIndex);

        int after = rowData.getAfterColumnsCount();
        if ("insert".equalsIgnoreCase(this.getEventType())) {
            return after;
        }

        int before = rowData.getBeforeColumnsCount();
        if ("delete".equalsIgnoreCase(this.getEventType())) {
            return before;
        }

        // update
        if (!NumberUtil.equals(before, after)) {
            throw new RuntimeException("#75 this=" + this + ", rowIndex=" + rowIndex + ", before=" + before + ", after=" + after);
        }

        return before;
    }

    @Override
    public Boolean getColumnIsKey(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this.getRowDataList().get(rowIndex);

        if ("delete".equalsIgnoreCase(this.getEventType())) {
            return rowData.getBeforeColumns(columnIndex).getIsKey();
        }

        if ("insert".equalsIgnoreCase(this.getEventType())) {
            return rowData.getAfterColumns(columnIndex).getIsKey();
        }

        boolean before = rowData.getBeforeColumns(columnIndex).getIsKey();
        boolean after = rowData.getAfterColumns(columnIndex).getIsKey();
        if (!BooleanUtil.equals(before, after)) {
            throw new RuntimeException("#102 this=" + this + ", rowIndex=" + rowIndex + ", before=" + before + ", after=" + after);
        }

        return before;
    }

    @Override
    public String getColumnName(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this.getRowDataList().get(rowIndex);

        if ("delete".equalsIgnoreCase(this.getEventType())) {
            return rowData.getBeforeColumns(columnIndex).getName();
        }

        if ("insert".equalsIgnoreCase(this.getEventType())) {
            return rowData.getAfterColumns(columnIndex).getName();
        }

        String before = rowData.getBeforeColumns(columnIndex).getName();
        String after = rowData.getAfterColumns(columnIndex).getName();
        if (!StringUtil.equals(before, after)) {
            throw new RuntimeException("#115 this=" + this + ", rowIndex=" + rowIndex + ", before=" + before + ", after=" + after);
        }

        return before;
    }

    @Override
    public String getColumnValueAfter(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this.getRowDataList().get(rowIndex);
        Integer afterColumnsCount = rowData.getAfterColumnsCount();
        if (columnIndex >= afterColumnsCount) {
            return null;
        }

        Column afterColumn = rowData.getAfterColumns(columnIndex);
        if (afterColumn.getIsNull()) {
            return null;
        }

        return afterColumn.getValue();
    }

    @Override
    public String getColumnValueBefore(Integer rowIndex, Integer columnIndex) {
        RowData rowData = this.getRowDataList().get(rowIndex);
        Integer beforeColumnsCount = rowData.getBeforeColumnsCount();
        if (columnIndex >= beforeColumnsCount) {
            return null;
        }

        Column beforeColumn = rowData.getBeforeColumns(columnIndex);
        if (beforeColumn.getIsNull()) {
            return null;
        }

        return beforeColumn.getValue();
    }

    @Override
    public String toString() {
        return super.toString() + ", " + "#45 entry.header=" + entry.getHeader() //
                + ", rowChange=" + RowChangeUtil.rowChangeToString(this.getRowDataList());
    }
}